package com.pie4cloud.pie.dmz.handler;

import com.alibaba.fastjson.JSONObject;
import com.pie4cloud.pie.dmz.entty.FileInfo;
import com.pie4cloud.pie.dmz.entty.constant.ConstantValue;
import com.pie4cloud.pie.dmz.entty.enums.MsgTypeEnum;
import com.pie4cloud.pie.dmz.entty.netty.Message;
import com.pie4cloud.pie.dmz.entty.result.Result;
import com.pie4cloud.pie.dmz.util.FileUtil;
import com.pie4cloud.pie.dmz.util.FtpClient;
import com.pie4cloud.pie.job.api.entity.ConfigFtp;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Date;

@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	/**
	 * @param ctx
	 * @return
	 * @throws Exception
	 * @author fanml
	 * @date 2021/7/4 17:48
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
		String clientIp = ipSocket.getAddress().getHostAddress();
		System.out.println("客户端IP" + clientIp + new Date());
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

		if (!(msg instanceof Message)) {
			return;
		}
		Message message = null;
		try {
			message = (Message) msg;
			//消息类型
			MsgTypeEnum msgTypeEnum = message.getMsgTypeEnum();
			//会话id
			String sessionId = message.getSessionId();
			byte[] msgBody = message.getMsgBody();
			String json = new String(msgBody, CharsetUtil.UTF_8);
			//System.out.println(json);
			//文件载体
			FileInfo fileInfo = JSONObject.parseObject(json, FileInfo.class);
			if (MsgTypeEnum.UPLOAD == msgTypeEnum) {
				//接收文件
				try {
					int count = merageFile(fileInfo, fileInfo.getDmzPath());
					if (fileInfo.getBlockCount() == count - 1) {
						//文件接收完成
						Result<JSONObject> success = Result.success();
						message.setMsgTypeEnum(MsgTypeEnum.RESPONSE);
						message.setMsgBody(JSONObject.toJSONString(success).getBytes(CharsetUtil.UTF_8));
					} else {
						//返回待接收的块号
						Result<Integer> success = Result.success(count);
						message.setMsgTypeEnum(MsgTypeEnum.UPLOAD);
						message.setMsgBody(JSONObject.toJSONString(success).getBytes(CharsetUtil.UTF_8));
					}
					ctx.writeAndFlush(message);
				} catch (Exception e) {
					message.setCode(-1);
					e.printStackTrace();
				}
			} else if (MsgTypeEnum.DOWNlOAD == msgTypeEnum) {
				//文件推送
				Message download = download(fileInfo,message.getSessionId());
				if (download == null) {
					download=new Message();
					message.setCode(-1);
				}
				ctx.writeAndFlush(download);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (message != null)
				ReferenceCountUtil.release(message);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("发生异常了----------------------------");
		cause.printStackTrace();
		ctx.close();
	}

	/**
	 * 文件接收并合并
	 *
	 * @param fileInfo   文件载体
	 * @param outDirPath 合并后的目录路径
	 * @return int
	 * @throws IOException
	 * @author fanml
	 * @date 2021/7/4 17:50
	 */
	private int merageFile(FileInfo fileInfo, String outDirPath) throws Exception {
		//文件名
		String fileName = fileInfo.getFileName();
		//文件名md5
		String md5FileName = fileInfo.getMd5FileName();
		//文件长度
		long fileLength = fileInfo.getFileLength();
		//总的块数
		int blockCount = fileInfo.getBlockCount();
		//当前块号
		int blockIndex = fileInfo.getBlockIndex();
		//文件内容
		byte[] content = fileInfo.getContent();
		String md5Content = fileInfo.getMd5Content();
		boolean flag = false;
		boolean lastDir = false;
		System.out.println("处理块号 " + blockIndex + "/" + blockCount);
		if (md5Content.equals(DigestUtils.md5Hex(content))) {
			//校验合法性
			//接收文件MD5所在目录
			if(!fileInfo.getDmzPath().endsWith("/")){
				fileInfo.setDmzPath(fileInfo.getDmzPath()+"/");
			}
			String fileDirPath =fileInfo.getDmzPath() + md5FileName + File.separator;
			File topDir = new File(fileDirPath);
			if (!topDir.exists())
				topDir.mkdirs();
			//已接受的分块数
			File[] files = topDir.listFiles();
			//当前块号目录
			String fileDirIndexPath = fileDirPath + blockIndex + File.separator;
			if (files.length > blockIndex && files.length < blockCount) {
				//中间块
				File fileIndexDir = new File(fileDirPath + files.length);
				if (fileIndexDir.listFiles().length > 0) {
					if (fileIndexDir.listFiles()[0].length() == ConstantValue.BLOCK_SIZE) {
						return files.length + 1;
					}
				} else {
					System.out.println("续传文件已损坏");
					FileUtils.deleteDirectory(fileIndexDir);
					return files.length;
				}
			} else if (files.length > blockIndex && files.length == blockCount) {
				//最后一块
				return blockCount;
			} else if (files.length == blockIndex && blockIndex == blockCount) {
				//文件已全部接收
				lastDir = true;
				File fileLastDir = new File(fileDirIndexPath);
				File[] fileLasts = fileLastDir.listFiles();
				if (fileLasts.length > 0) {
					//标记最后一块是否全部接收
					flag = fileLasts[0].length() == content.length;
					if (!flag) {
						fileLasts[0].delete();
					}
				}
			}
			File file = new File(fileDirIndexPath, fileName + "." + String.format("%0" + String.valueOf(blockCount).length() + "d", blockIndex) + ".part");
			if (!file.exists()) {
				new File(fileDirIndexPath).mkdirs();
			}
			if (!flag) {
				//保存指定块号文件
				FileChannel channel = new FileOutputStream(file).getChannel();
				ByteBuffer buffer = ByteBuffer.allocate(content.length);
				buffer.put(content);
				buffer.flip();
				channel.write(buffer);
				channel.close();
				buffer.clear();
			}
			if (lastDir || files.length + 1 == blockCount) {
				//合并文件
				FileUtil.mergeDirFile(fileDirPath, outDirPath);
				ConfigFtp configFtp=new ConfigFtp();
				configFtp.setFtpId(fileInfo.getFtpId());
				configFtp.setFtpAddress(fileInfo.getFtpAddress());
				configFtp.setFtpPort(fileInfo.getFtpPort());
				configFtp.setFtpUserName(fileInfo.getFtpUserName());
				configFtp.setFtpPwd(fileInfo.getFtpPwd());
				FtpClient ftpClient=new FtpClient().getFtpClient(configFtp);
				File fileUp=new File(outDirPath+"/"+fileInfo.getFileName());
				if(!file.exists()){
                      throw new Exception("文件合并出错");
				}

				ftpClient.uploadFiles(fileUp,fileInfo.getTargetPath(),fileInfo.getTargetFileName());
				return blockCount + 1;
			} else {
				//返回下一块
				return blockIndex + 1;
			}
		}
		return blockIndex;
	}


	/**
	 * 下载文件
	 *
	 * @param fileInfo
	 * @return com.fanml.entty.netty.Message
	 * @throws IOException
	 * @author fanml
	 * @date 2021/7/4 18:16
	 */
	private Message download(FileInfo fileInfo,String id) throws IOException {

		String md5FileName = fileInfo.getMd5FileName();
		String fileName = fileInfo.getFileName();
		//File file = new File(ConstantValue.UPLOAD_FILE_PATH + md5FileName, fileName);
		File file = new File(fileInfo.getFilePath());
		if (!file.exists())
			return null;
		long fileLength = file.length();
		//文件总块数
		int blockCount = (int) (fileLength % ConstantValue.BLOCK_SIZE == 0 ? fileLength / ConstantValue.BLOCK_SIZE : fileLength / ConstantValue.BLOCK_SIZE + 1);
		fileInfo.setBlockCount(blockCount);
		fileInfo.setFileLength(fileLength);
		int blockIndex = fileInfo.getBlockIndex();
		if (0 == blockIndex) {
			blockIndex = 1;
			fileInfo.setBlockIndex(blockIndex);
		}
		//文件分块
		RandomAccessFile r = new RandomAccessFile(file, "r");
		FileChannel localFileChannel = r.getChannel();
		//文件起始位置
		long position = (blockIndex - 1) * ConstantValue.BLOCK_SIZE;
		//文件分块长度
		long size = blockIndex == blockCount ? file.length() - (blockIndex - 1) * ConstantValue.BLOCK_SIZE : ConstantValue.BLOCK_SIZE;
		MappedByteBuffer mappedByteBuffer = localFileChannel.map(FileChannel.MapMode.READ_ONLY, position, size);
		byte[] fileBlockByte = new byte[(int) size];
		for (int offset = 0; offset < size; offset++) {
			byte b = mappedByteBuffer.get();
			fileBlockByte[offset] = b;
		}
		fileInfo.setContent(fileBlockByte);
		String jsonString = JSONObject.toJSONString(fileInfo);
		Message message = new Message(id, MsgTypeEnum.DOWNlOAD, jsonString.getBytes(CharsetUtil.UTF_8));
		mappedByteBuffer.clear();
		return message;
	}

}
